/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.stream.socket;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.stream.StreamSingleTupleExtractor;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHALLER_BLACKLIST;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHALLER_WHITELIST;

/**
 * Tests for whitelist and blacklist ot avoiding deserialization vulnerability.
 */
public class SocketStreamerUnmarshalVulnerabilityTest extends GridCommonAbstractTest {
    /** Shared value. */
    private static final AtomicBoolean SHARED = new AtomicBoolean();

    /** Port. */
    private static int port;

    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        try (ServerSocket sock = new ServerSocket(0)) {
            port = sock.getLocalPort();
        }

        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

        cfg.setCacheConfiguration(defaultCacheConfiguration());

        return cfg;
    }

    /** {@inheritDoc} */
    @Override protected void beforeTest() throws Exception {
        super.beforeTest();

        SHARED.set(false);

        System.clearProperty(IGNITE_MARSHALLER_WHITELIST);
        System.clearProperty(IGNITE_MARSHALLER_BLACKLIST);

        IgniteUtils.clearClassCache();
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testNoLists() throws Exception {
        testExploit(true);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testWhiteListIncluded() throws Exception {
        String path = U.resolveIgnitePath("modules/core/src/test/config/class_list_exploit_included.txt").getPath();

        System.setProperty(IGNITE_MARSHALLER_WHITELIST, path);

        testExploit(true);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testWhiteListExcluded() throws Exception {
        String path = U.resolveIgnitePath("modules/core/src/test/config/class_list_exploit_excluded.txt").getPath();

        System.setProperty(IGNITE_MARSHALLER_WHITELIST, path);

        testExploit(false);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testBlackListIncluded() throws Exception {
        String path = U.resolveIgnitePath("modules/core/src/test/config/class_list_exploit_included.txt").getPath();

        System.setProperty(IGNITE_MARSHALLER_BLACKLIST, path);

        testExploit(false);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testBlackListExcluded() throws Exception {
        String path = U.resolveIgnitePath("modules/core/src/test/config/class_list_exploit_excluded.txt").getPath();

        System.setProperty(IGNITE_MARSHALLER_BLACKLIST, path);

        testExploit(true);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testBothListIncluded() throws Exception {
        String path = U.resolveIgnitePath("modules/core/src/test/config/class_list_exploit_included.txt").getPath();

        System.setProperty(IGNITE_MARSHALLER_WHITELIST, path);
        System.setProperty(IGNITE_MARSHALLER_BLACKLIST, path);

        testExploit(false);
    }

    /**
     * @param positive Positive.
     */
    private void testExploit(boolean positive) throws Exception {
        try {
            Ignite ignite = startGrid();

            SocketStreamer<Exploit, Integer, String> sockStmr = null;

            try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
                stmr.allowOverwrite(true);
                stmr.autoFlushFrequency(10);

                sockStmr = new SocketStreamer<>();

                sockStmr.setIgnite(ignite);

                sockStmr.setStreamer(stmr);

                sockStmr.setPort(port);

                sockStmr.setSingleTupleExtractor(new StreamSingleTupleExtractor<Exploit, Integer, String>() {
                    @Override public Map.Entry<Integer, String> extract(Exploit msg) {
                        return new IgniteBiTuple<>(1, "val");
                    }
                });

                sockStmr.start();

                try (Socket sock = new Socket(InetAddress.getLocalHost(), port);
                     OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
                    Marshaller marsh = new JdkMarshaller();

                    byte[] msg = marsh.marshal(new Exploit());

                    os.write(msg.length >>> 24);
                    os.write(msg.length >>> 16);
                    os.write(msg.length >>> 8);
                    os.write(msg.length);

                    os.write(msg);
                }
                catch (IOException | IgniteCheckedException e) {
                    throw new IgniteException(e);
                }

                boolean res = GridTestUtils.waitForCondition(new GridAbsPredicate() {
                    @Override public boolean apply() {
                        return SHARED.get();
                    }
                }, 3000L);

                if (positive)
                    assertTrue(res);
                else
                    assertFalse(res);
            }
            finally {
                if (sockStmr != null)
                    sockStmr.stop();
            }
        }
        finally {
            stopAllGrids();
        }
    }

    /** */
    private static class Exploit implements Serializable {
        /**
         * @param is Input stream.
         */
        private void readObject(ObjectInputStream is) throws ClassNotFoundException, IOException {
            SHARED.set(true);
        }
    }
}
